Setup

# Load packages
pacman::p_load(parallel)
  
# Knit settings
knitr::opts_chunk$set(echo=TRUE, message=FALSE, comment=NA, warning=FALSE, tidy=TRUE, results="hold", cache=FALSE)

# Denote cores (um... actually, they are called threads)
noquote("Number of threads")
numCores <- detectCores()
numCores
[1] Number of threads
[1] 8

lapply()

To use the parallel package, it is helpful to know how to use what I will call lapply longhand, which is when one defines a function before invoking lapply.

# lapply with one input
noquote("lapply with one input")
lapply(1:3, paste)


# lapply shorthand
noquote("lapply shorthand")
lapply(1:3, function(x) paste("task", x))


# lapply longhand
noquote("lapply longhand")

## save as function
task_func <- function(x) {
    paste("task", x)
}

## apply function in lapply
lapply(1:3, task_func)
[1] lapply with one input
[[1]]
[1] "1"

[[2]]
[1] "2"

[[3]]
[1] "3"

[1] lapply shorthand
[[1]]
[1] "task 1"

[[2]]
[1] "task 2"

[[3]]
[1] "task 3"

[1] lapply longhand
[[1]]
[1] "task 1"

[[2]]
[1] "task 2"

[[3]]
[1] "task 3"

Non-parallel

Let’s let it sleep on one thread for 24 seconds. The time reflects the total task time, plus any overhead a computer naturally generates.

# Sleep for 24 seconds
save1 <- system.time(
                     Sys.sleep(24)
)

# Display time
noquote("Time in seconds")
save1
[1] Time in seconds
   user  system elapsed 
   0.00    0.00   24.42 

Parallel on even tasks

Let’s let it sleep for 24 seconds, but divide the 24 seconds equally on each of the 8 threads so that each thread sleeps for 3 seconds.

# Define cluster
cl <- parallel::makeCluster(numCores)

# Sleep for 24 seconds
save2 <- system.time(
                     parallel::parLapply( cl, 
                                          rep(24/numCores, numCores), # 3 sec
                                          Sys.sleep 
                     )
)

# Stop cluster
parallel::stopCluster(cl)

# Display time
noquote("Time in seconds")
save2
[1] Time in seconds
   user  system elapsed 
   0.00    0.00    3.07 

Parallel on unequal tasks

Previously, the 24 seconds were divided evenly on 8 threads.

Here, we will have unequal tasks that take 10 sec, 5 sec, and six 1.5 sec tasks.

By default, parallel processing will take at least as long as the longest task (10 sec).

Using an if statement, we can break down the longest task into 7 parts and distribute it on all threads except the second longest task (5 sec) to bring down the time to at least as long as the second task.

Default to the longest task

# Define cluster
cl <- parallel::makeCluster(numCores)

# Sleep for 24 seconds
save3 <- system.time(
                     parallel::parLapply( cl, 
                                          c( 10, # 10 sec, 1x
                                             5, # 5 sec, 1x
                                             rep(1.5, 6) # 1.5 sec, 6x
                                             ), 
                                          Sys.sleep 
                     )
)

# Stop cluster
parallel::stopCluster(cl)

# Display time
noquote("Time in seconds")
save3
[1] Time in seconds
   user  system elapsed 
   0.00    0.00   10.18 

More customized with if statements

# Create a function that distributes the longest task
# on the shortest 6 threads
if_func <- function(x) {
    
    # Longest task
    longCL <- 10
    
    # Longest task on other 7 threads
    secondCL <- 5
    
    # Shortest 6 threads
    otherCL <- 1.5
    
    # Long task
    if (x==longCL) {
        Sys.sleep(longCL/7) # Cut by 7 and distribute the rest on 6 shortest threads
    } 
    
    # Second longest task
    else if (x==secondCL) {
        Sys.sleep(secondCL) # Keep this the same length
    } 
    
    # Shortest 6 threads
    else {
        # Distribute longest task evenly on the other threads
        Sys.sleep(x + longCL/7) # add 1.4 sec from longest task
    }
}

# Define cluster
cl <- parallel::makeCluster(numCores)

# Sleep for 24 seconds
save4 <- system.time(
                     parallel::parLapply( cl, 
                                          c( 10, # 10 sec, 1x
                                             5, # 5 sec, 1x
                                             rep(1.5, 6) # 1.5 sec, 6x
                                             ), 
                                          if_func 
                     )
)

# Stop cluster
parallel::stopCluster(cl)

# Display time
noquote("Time in seconds")
save4
[1] Time in seconds
   user  system elapsed 
   0.00    0.00    5.08 

Conclusion

We had a task that took 24 seconds.

Time 1

Doing nothing, it took that time plus the normal overhead for a total of 24.42 seconds.

Time 2

Dividing the task along 8 threads evenly took a total of 3.07 seconds.

Time 3

For unequal tasks (10s, 5s, six 1.5s), simply using the parallel package will lead to the short tasks finishing first and waiting until the longest task finishes for a total of 10.18 seconds.

Time 4

For unequal tasks (10s, 5s, six 1.5s), using a customized if statement that distributes the longest task took a total of 5.08 seconds.